1 package org.apache.maven.surefire.junitcore.pc;
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22 import java.lang.annotation.Annotation;
23 import java.util.ArrayList;
24 import java.util.Arrays;
25 import java.util.Collection;
26 import java.util.Collections;
27 import java.util.EnumMap;
28 import java.util.Iterator;
29 import java.util.LinkedHashSet;
30 import java.util.List;
31 import java.util.Map;
32 import java.util.Set;
33 import java.util.concurrent.ExecutorService;
34 import java.util.concurrent.Executors;
35 import java.util.concurrent.ThreadFactory;
36
37 import org.apache.maven.surefire.junitcore.JUnitCoreParameters;
38 import org.apache.maven.surefire.report.ConsoleLogger;
39 import org.apache.maven.surefire.testset.TestSetFailedException;
40 import org.apache.maven.surefire.util.internal.DaemonThreadFactory;
41 import org.junit.internal.runners.ErrorReportingRunner;
42 import org.junit.runner.Description;
43 import org.junit.runner.Runner;
44 import org.junit.runner.manipulation.Filter;
45 import org.junit.runner.manipulation.NoTestsRemainException;
46 import org.junit.runner.notification.RunNotifier;
47 import org.junit.runners.ParentRunner;
48 import org.junit.runners.Suite;
49 import org.junit.runners.model.InitializationError;
50 import org.junit.runners.model.RunnerBuilder;
51
52 import static org.apache.maven.surefire.junitcore.pc.ParallelComputerUtil.resolveConcurrency;
53 import static org.apache.maven.surefire.junitcore.pc.Type.CLASSES;
54 import static org.apache.maven.surefire.junitcore.pc.Type.METHODS;
55 import static org.apache.maven.surefire.junitcore.pc.Type.SUITES;
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81 public final class ParallelComputerBuilder
82 {
83 private static final ThreadFactory DAEMON_THREAD_FACTORY = DaemonThreadFactory.newDaemonThreadFactory();
84
85 private static final Class<? extends Annotation> JCIP_NOT_THREAD_SAFE = loadNotThreadSafeAnnotations();
86
87 private static final Set<?> NULL_SINGLETON = Collections.singleton( null );
88
89 static final int TOTAL_POOL_SIZE_UNDEFINED = 0;
90
91 private final Map<Type, Integer> parallelGroups = new EnumMap<Type, Integer>( Type.class );
92
93 private final ConsoleLogger logger;
94
95 private boolean useSeparatePools;
96
97 private int totalPoolSize;
98
99 private JUnitCoreParameters parameters;
100
101 private boolean optimize;
102
103 private boolean runningInTests;
104
105
106
107
108
109
110 ParallelComputerBuilder( ConsoleLogger logger )
111 {
112 this.logger = logger;
113 runningInTests = true;
114 useSeparatePools();
115 parallelGroups.put( SUITES, 0 );
116 parallelGroups.put( CLASSES, 0 );
117 parallelGroups.put( METHODS, 0 );
118 }
119
120 public ParallelComputerBuilder( ConsoleLogger logger, JUnitCoreParameters parameters )
121 {
122 this( logger );
123 runningInTests = false;
124 this.parameters = parameters;
125 }
126
127 public ParallelComputer buildComputer()
128 {
129 return new PC();
130 }
131
132 ParallelComputerBuilder useSeparatePools()
133 {
134 totalPoolSize = TOTAL_POOL_SIZE_UNDEFINED;
135 useSeparatePools = true;
136 return this;
137 }
138
139 ParallelComputerBuilder useOnePool()
140 {
141 totalPoolSize = TOTAL_POOL_SIZE_UNDEFINED;
142 useSeparatePools = false;
143 return this;
144 }
145
146
147
148
149
150
151
152 ParallelComputerBuilder useOnePool( int totalPoolSize )
153 {
154 if ( totalPoolSize < 1 )
155 {
156 throw new IllegalArgumentException( "Size of common pool is less than 1." );
157 }
158 this.totalPoolSize = totalPoolSize;
159 useSeparatePools = false;
160 return this;
161 }
162
163 boolean isOptimized()
164 {
165 return optimize;
166 }
167
168 ParallelComputerBuilder optimize( boolean optimize )
169 {
170 this.optimize = optimize;
171 return this;
172 }
173
174 ParallelComputerBuilder parallelSuites()
175 {
176 return parallel( SUITES );
177 }
178
179 ParallelComputerBuilder parallelSuites( int nThreads )
180 {
181 return parallel( nThreads, SUITES );
182 }
183
184 ParallelComputerBuilder parallelClasses()
185 {
186 return parallel( CLASSES );
187 }
188
189 ParallelComputerBuilder parallelClasses( int nThreads )
190 {
191 return parallel( nThreads, CLASSES );
192 }
193
194 ParallelComputerBuilder parallelMethods()
195 {
196 return parallel( METHODS );
197 }
198
199 ParallelComputerBuilder parallelMethods( int nThreads )
200 {
201 return parallel( nThreads, METHODS );
202 }
203
204 private ParallelComputerBuilder parallel( int nThreads, Type parallelType )
205 {
206 if ( nThreads < 0 )
207 {
208 throw new IllegalArgumentException( "negative nThreads " + nThreads );
209 }
210
211 if ( parallelType == null )
212 {
213 throw new NullPointerException( "null parallelType" );
214 }
215
216 parallelGroups.put( parallelType, nThreads );
217 return this;
218 }
219
220 private ParallelComputerBuilder parallel( Type parallelType )
221 {
222 return parallel( Integer.MAX_VALUE, parallelType );
223 }
224
225 private double parallelTestsTimeoutInSeconds()
226 {
227 return parameters == null ? 0d : parameters.getParallelTestsTimeoutInSeconds();
228 }
229
230 private double parallelTestsTimeoutForcedInSeconds()
231 {
232 return parameters == null ? 0d : parameters.getParallelTestsTimeoutForcedInSeconds();
233 }
234
235 @SuppressWarnings( "unchecked" )
236 private static Class<? extends Annotation> loadNotThreadSafeAnnotations()
237 {
238 try
239 {
240 Class c = Class.forName( "net.jcip.annotations.NotThreadSafe" );
241 return c.isAnnotation() ? (Class<? extends Annotation>) c : null;
242 }
243 catch ( ClassNotFoundException e )
244 {
245 return null;
246 }
247 }
248
249 final class PC
250 extends ParallelComputer
251 {
252 private final SingleThreadScheduler notThreadSafeTests =
253 new SingleThreadScheduler( ParallelComputerBuilder.this.logger );
254
255 final Collection<ParentRunner> suites = new LinkedHashSet<ParentRunner>();
256
257 final Collection<ParentRunner> nestedSuites = new LinkedHashSet<ParentRunner>();
258
259 final Collection<ParentRunner> classes = new LinkedHashSet<ParentRunner>();
260
261 final Collection<ParentRunner> nestedClasses = new LinkedHashSet<ParentRunner>();
262
263 final Collection<Runner> notParallelRunners = new LinkedHashSet<Runner>();
264
265 int poolCapacity;
266
267 boolean splitPool;
268
269 private final Map<Type, Integer> allGroups;
270
271 private long nestedClassesChildren;
272
273 private volatile Scheduler master;
274
275 private PC()
276 {
277 super( parallelTestsTimeoutInSeconds(), parallelTestsTimeoutForcedInSeconds() );
278 allGroups = new EnumMap<Type, Integer>( ParallelComputerBuilder.this.parallelGroups );
279 poolCapacity = ParallelComputerBuilder.this.totalPoolSize;
280 splitPool = ParallelComputerBuilder.this.useSeparatePools;
281 }
282
283 @Override
284 protected ShutdownResult describeStopped( boolean shutdownNow )
285 {
286 ShutdownResult shutdownResult = notThreadSafeTests.describeStopped( shutdownNow );
287 final Scheduler m = master;
288 if ( m != null )
289 {
290 ShutdownResult shutdownResultOfMaster = m.describeStopped( shutdownNow );
291 shutdownResult.getTriggeredTests().addAll( shutdownResultOfMaster.getTriggeredTests() );
292 shutdownResult.getIncompleteTests().addAll( shutdownResultOfMaster.getIncompleteTests() );
293 }
294 return shutdownResult;
295 }
296
297 @Override
298 boolean shutdownThreadPoolsAwaitingKilled()
299 {
300 boolean notInterrupted = notThreadSafeTests.shutdownThreadPoolsAwaitingKilled();
301 final Scheduler m = master;
302 if ( m != null )
303 {
304 notInterrupted &= m.shutdownThreadPoolsAwaitingKilled();
305 }
306 return notInterrupted;
307 }
308
309 @Override
310 public Runner getSuite( RunnerBuilder builder, Class<?>[] cls )
311 throws InitializationError
312 {
313 try
314 {
315 super.getSuite( builder, cls );
316 populateChildrenFromSuites();
317
318 WrappedRunners suiteSuites = wrapRunners( suites );
319 WrappedRunners suiteClasses = wrapRunners( classes );
320
321 long suitesCount = suites.size();
322 long classesCount = classes.size() + nestedClasses.size();
323 long methodsCount = suiteClasses.embeddedChildrenCount + nestedClassesChildren;
324 if ( !ParallelComputerBuilder.this.runningInTests )
325 {
326 determineThreadCounts( suitesCount, classesCount, methodsCount );
327 }
328
329 return setSchedulers( suiteSuites.wrappingSuite, suiteClasses.wrappingSuite );
330 }
331 catch ( TestSetFailedException e )
332 {
333 throw new InitializationError( Collections.<Throwable>singletonList( e ) );
334 }
335 }
336
337 @Override
338 protected Runner getRunner( RunnerBuilder builder, Class<?> testClass )
339 throws Throwable
340 {
341 Runner runner = super.getRunner( builder, testClass );
342 if ( canSchedule( runner ) )
343 {
344 if ( !isThreadSafe( runner ) )
345 {
346 ( ( ParentRunner ) runner ).setScheduler( notThreadSafeTests.newRunnerScheduler() );
347 notParallelRunners.add( runner );
348 }
349 else if ( runner instanceof Suite )
350 {
351 suites.add( (Suite) runner );
352 }
353 else
354 {
355 classes.add( (ParentRunner) runner );
356 }
357 }
358 else
359 {
360 notParallelRunners.add( runner );
361 }
362 return runner;
363 }
364
365 private void determineThreadCounts( long suites, long classes, long methods )
366 throws TestSetFailedException
367 {
368 final JUnitCoreParameters parameters = ParallelComputerBuilder.this.parameters;
369 final boolean optimize = ParallelComputerBuilder.this.optimize;
370 RunnerCounter counts = new RunnerCounter( suites, classes, methods );
371 Concurrency concurrency = resolveConcurrency( parameters, optimize ? counts : null );
372 allGroups.put( SUITES, concurrency.suites );
373 allGroups.put( CLASSES, concurrency.classes );
374 allGroups.put( METHODS, concurrency.methods );
375 poolCapacity = concurrency.capacity;
376 splitPool &= concurrency.capacity <= 0;
377 }
378
379 private <T extends Runner> WrappedRunners wrapRunners( Collection<T> runners )
380 throws InitializationError
381 {
382
383 long childrenCounter = 0;
384 ArrayList<Runner> runs = new ArrayList<Runner>();
385 for ( T runner : runners )
386 {
387 if ( runner != null )
388 {
389 int children = countChildren( runner );
390 childrenCounter += children;
391 if ( children != 0 )
392 {
393 runs.add( runner );
394 }
395 }
396 }
397 return runs.isEmpty() ? new WrappedRunners() : new WrappedRunners( createSuite( runs ), childrenCounter );
398 }
399
400 private int countChildren( Runner runner )
401 {
402 Description description = runner.getDescription();
403 Collection children = description == null ? null : description.getChildren();
404 return children == null ? 0 : children.size();
405 }
406
407 private ExecutorService createPool( int poolSize )
408 {
409 return poolSize < Integer.MAX_VALUE
410 ? Executors.newFixedThreadPool( poolSize, DAEMON_THREAD_FACTORY )
411 : Executors.newCachedThreadPool( DAEMON_THREAD_FACTORY );
412 }
413
414 private Scheduler createMaster( ExecutorService pool, int poolSize )
415 {
416 final int finalRunnersCounter = countFinalRunners();
417 final SchedulingStrategy strategy;
418 if ( finalRunnersCounter <= 1 || poolSize <= 1 )
419 {
420 strategy = new InvokerStrategy( ParallelComputerBuilder.this.logger );
421 }
422 else if ( pool != null && poolSize == Integer.MAX_VALUE )
423 {
424 strategy = new SharedThreadPoolStrategy( ParallelComputerBuilder.this.logger, pool );
425 }
426 else
427 {
428 strategy = SchedulingStrategies.createParallelStrategy( ParallelComputerBuilder.this.logger,
429 finalRunnersCounter );
430 }
431 return new Scheduler( ParallelComputerBuilder.this.logger, null, strategy );
432 }
433
434 private int countFinalRunners()
435 {
436 int counter = notParallelRunners.isEmpty() ? 0 : 1;
437
438 if ( !suites.isEmpty() && allGroups.get( SUITES ) > 0 )
439 {
440 ++counter;
441 }
442
443 if ( !classes.isEmpty() && allGroups.get( CLASSES ) > 0 )
444 {
445 ++counter;
446 }
447
448 return counter;
449 }
450
451 private void populateChildrenFromSuites()
452 {
453
454 Filter filter = new SuiteFilter();
455 for ( Iterator<ParentRunner> it = suites.iterator(); it.hasNext(); )
456 {
457 ParentRunner suite = it.next();
458 try
459 {
460 suite.filter( filter );
461 }
462 catch ( NoTestsRemainException e )
463 {
464 it.remove();
465 }
466 }
467 }
468
469 private int totalPoolSize()
470 {
471 if ( poolCapacity == TOTAL_POOL_SIZE_UNDEFINED )
472 {
473 int total = 0;
474 for ( int nThreads : allGroups.values() )
475 {
476 total += nThreads;
477 if ( total < 0 )
478 {
479 total = Integer.MAX_VALUE;
480 break;
481 }
482 }
483 return total;
484 }
485 else
486 {
487 return poolCapacity;
488 }
489 }
490
491 private Runner setSchedulers( ParentRunner suiteSuites, ParentRunner suiteClasses )
492 throws InitializationError
493 {
494 int parallelSuites = allGroups.get( SUITES );
495 int parallelClasses = allGroups.get( CLASSES );
496 int parallelMethods = allGroups.get( METHODS );
497 int poolSize = totalPoolSize();
498 ExecutorService commonPool = splitPool || poolSize == 0 ? null : createPool( poolSize );
499 master = createMaster( commonPool, poolSize );
500
501 if ( suiteSuites != null )
502 {
503
504 if ( commonPool != null && parallelSuites > 0 )
505 {
506 Balancer balancer = BalancerFactory.createBalancerWithFairness( parallelSuites );
507 suiteSuites.setScheduler( createScheduler( null, commonPool, true, balancer ) );
508 }
509 else
510 {
511 suiteSuites.setScheduler( createScheduler( parallelSuites ) );
512 }
513 }
514
515
516 ArrayList<ParentRunner> allSuites = new ArrayList<ParentRunner>( suites );
517 allSuites.addAll( nestedSuites );
518 if ( suiteClasses != null )
519 {
520 allSuites.add( suiteClasses );
521 }
522 if ( !allSuites.isEmpty() )
523 {
524 setSchedulers( allSuites, parallelClasses, commonPool );
525 }
526
527
528 ArrayList<ParentRunner> allClasses = new ArrayList<ParentRunner>( classes );
529 allClasses.addAll( nestedClasses );
530 if ( !allClasses.isEmpty() )
531 {
532 setSchedulers( allClasses, parallelMethods, commonPool );
533 }
534
535
536 ParentRunner all = createFinalRunner( removeNullRunners(
537 Arrays.<Runner>asList( suiteSuites, suiteClasses, createSuite( notParallelRunners ) )
538 ) );
539 all.setScheduler( master );
540 return all;
541 }
542
543 private ParentRunner createFinalRunner( List<Runner> runners )
544 throws InitializationError
545 {
546 return new Suite( null, runners )
547 {
548 @Override
549 public void run( RunNotifier notifier )
550 {
551 try
552 {
553 beforeRunQuietly();
554 super.run( notifier );
555 }
556 finally
557 {
558 afterRunQuietly();
559 }
560 }
561 };
562 }
563
564 private void setSchedulers( Iterable<? extends ParentRunner> runners, int poolSize, ExecutorService commonPool )
565 {
566 if ( commonPool != null )
567 {
568 Balancer concurrencyLimit = BalancerFactory.createBalancerWithFairness( poolSize );
569 boolean doParallel = poolSize > 0;
570 for ( ParentRunner runner : runners )
571 {
572 runner.setScheduler(
573 createScheduler( runner.getDescription(), commonPool, doParallel, concurrencyLimit ) );
574 }
575 }
576 else
577 {
578 ExecutorService pool = null;
579 if ( poolSize == Integer.MAX_VALUE )
580 {
581 pool = Executors.newCachedThreadPool( DAEMON_THREAD_FACTORY );
582 }
583 else if ( poolSize > 0 )
584 {
585 pool = Executors.newFixedThreadPool( poolSize, DAEMON_THREAD_FACTORY );
586 }
587 boolean doParallel = pool != null;
588 for ( ParentRunner runner : runners )
589 {
590 runner.setScheduler( createScheduler( runner.getDescription(), pool, doParallel,
591 BalancerFactory.createInfinitePermitsBalancer() ) );
592 }
593 }
594 }
595
596 private Scheduler createScheduler( Description desc, ExecutorService pool, boolean doParallel,
597 Balancer concurrency )
598 {
599 doParallel &= pool != null;
600 SchedulingStrategy strategy = doParallel
601 ? new SharedThreadPoolStrategy( ParallelComputerBuilder.this.logger, pool )
602 : new InvokerStrategy( ParallelComputerBuilder.this.logger );
603 return new Scheduler( ParallelComputerBuilder.this.logger, desc, master, strategy, concurrency );
604 }
605
606 private Scheduler createScheduler( int poolSize )
607 {
608 final SchedulingStrategy strategy;
609 if ( poolSize == Integer.MAX_VALUE )
610 {
611 strategy = SchedulingStrategies.createParallelStrategyUnbounded( ParallelComputerBuilder.this.logger );
612 }
613 else if ( poolSize == 0 )
614 {
615 strategy = new InvokerStrategy( ParallelComputerBuilder.this.logger );
616 }
617 else
618 {
619 strategy = SchedulingStrategies.createParallelStrategy( ParallelComputerBuilder.this.logger, poolSize );
620 }
621 return new Scheduler( ParallelComputerBuilder.this.logger, null, master, strategy );
622 }
623
624 private boolean canSchedule( Runner runner )
625 {
626 return !( runner instanceof ErrorReportingRunner ) && runner instanceof ParentRunner;
627 }
628
629 private boolean isThreadSafe( Runner runner )
630 {
631 return runner.getDescription().getAnnotation( JCIP_NOT_THREAD_SAFE ) == null;
632 }
633
634 private class SuiteFilter
635 extends Filter
636 {
637
638
639 @Override
640 public boolean shouldRun( Description description )
641 {
642 return true;
643 }
644
645 @Override
646 public void apply( Object child )
647 throws NoTestsRemainException
648 {
649 super.apply( child );
650 if ( child instanceof ParentRunner )
651 {
652 ParentRunner runner = ( ParentRunner ) child;
653 if ( !isThreadSafe( runner ) )
654 {
655 runner.setScheduler( notThreadSafeTests.newRunnerScheduler() );
656 }
657 else if ( child instanceof Suite )
658 {
659 nestedSuites.add( (Suite) child );
660 }
661 else
662 {
663 ParentRunner parentRunner = (ParentRunner) child;
664 nestedClasses.add( parentRunner );
665 nestedClassesChildren += parentRunner.getDescription().getChildren().size();
666 }
667 }
668 }
669
670 @Override
671 public String describe()
672 {
673 return "";
674 }
675 }
676 }
677
678 private static Suite createSuite( Collection<Runner> runners )
679 throws InitializationError
680 {
681 final List<Runner> onlyRunners = removeNullRunners( runners );
682 return onlyRunners.isEmpty() ? null : new Suite( null, onlyRunners )
683 {
684 };
685 }
686
687 private static List<Runner> removeNullRunners( Collection<Runner> runners )
688 {
689 final List<Runner> onlyRunners = new ArrayList<Runner>( runners );
690 onlyRunners.removeAll( NULL_SINGLETON );
691 return onlyRunners;
692 }
693 }